iT邦幫忙

2023 iThome 鐵人賽

DAY 19
0
Modern Web

FastAPI 如何 Fast ? 框架入門、實例、重構與測試系列 第 19

[Day19] OAuth2 實例:Authorize Dependency 、 權限管理

  • 分享至 

  • xImage
  •  

[Day19] OAuth2 實例:Authorize Dependency 、 權限管理

本次的程式碼與目錄結構可以參考 FastAPI Tutorial : Day19 branch

回顧

我們在 Day17 完成 hash password 的實作
Day18 完成 JWT token 的實作

今天我們會完成整體 OAuth2 password login 的實作
包括 Authorize Dependency權限管理

OAuth2 Login 實作

因為我們在註冊 User 時,都是以 Email 作為 username
所以在 Login 時,也要以 Email 作為 username

並且我們需要在 crud/user.py 中新增 get_user_in_db
讓我們只取出 User 中的 usernamepassword

crud/user.py

#  ...
class UserCrudManager:
    # ...
    async def get_user_in_db(self,email: str,db_session:AsyncSession=None) -> UserSchema.UserInDB :
        stmt = select(UserModel.id,UserModel.name,UserModel.password).where(UserModel.email == email)
        result = await db_session.execute(stmt)
        user = result.first()
        if user:
            return user
            
        return None
# ...

也可以順便加上 UserInDB 的 schema
讓我們在 API Endpoint 知道 get_user_in_db 回傳的資料格式

schemas/user.py


# ...

class UserInDB(BaseModel):
    id: int
    name: str
    password: str

接著我們就可以在 api/auth.py 中實作 Login Endpoint
先透過 get_user_in_db 取得 Userusernamepassword
如果沒有該 User,就先丟出 HTTPException

api/auth.py


@router.post("/login",response_model=Token)
async def login(form_data: login_form_schema):

    user_in_db:UserInDB = await UserCrud.get_user_in_db(form_data.username)

    if user_in_db is None:
        raise HTTPException(
            status_code=401,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"}
        )

    # ...

再接著檢查密碼是否正確
如果也沒問題,就回傳一組新的 JWT token

api/auth.py

async def login(form_data: login_form_schema):
    # ...

    if not verify_password(form_data.password,user_in_db.password):
        raise HTTPException(
            status_code=401,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"}
        )
    
    return await create_token_pair({"username": user_in_db.name},{"username": user_in_db.name})

測試 Login Endpoint

先創建一個測試 User

create user

以正確的帳號密碼來測試
correct login

以錯誤密碼來測試
incorrect password

以錯誤帳號來測試

incorrect username

Authorize Dependency

針對一些需要登入才能使用的 API Endpoint
我們應該要帶入 JWT token 來驗證使用者身份

這邊我們使用 fastapi.security 中的 OAuth2PasswordBearer
來作為 Authorize Dependency
這邊先以 update_user API Endpoint 為例

api/user.py

@router.put("/users/{user_id}" , response_model=UserSchema.UserUpdateResponse )
async def update_user(
    newUser: UserSchema.UserUpdate,
    user_id:int=Depends(check_user_id),
    token:str = Depends(OAuth2PasswordBearer(tokenUrl="api/auth/login")) # <----- 新增
    ):
    # ...

在 Swagger UI 中,我們可以看到 update_user API Endpoint 被加上了鎖頭
lock

並且我們按下鎖頭後,就可以輸入 Username 和 Password
Swagger UI 會幫我們帶入 tokenUrl 中的 URL
並在 Authorization header 中帶入 JWT token

authorize

當我們輸入正確的帳號密碼後,就可以成功打 update_user API Endpoint
update user

權限管理

但我們還遇到一個嚴重的問題
就是任何有登入的 User都可以使用 update_user API Endpoint

所以我們應該要在 update_user API Endpoint 中檢查 JWT token 中的 User 與 user_id 是否相同
又因為我們的 User.name 可以重複
所以我們可以在 JWT token 中加入 User.id

api/auth.py

@router.post("/login",response_model=Token)
async def login(form_data: login_form_schema):

    # ...

    return await create_token_pair(
        {"username": user_in_db.name, "id": user_in_db.id},
        {"username": user_in_db.name, "id": user_in_db.id},
    )

@router.post("/refresh",response_model=Token)
async def refresh(refersh_data: RefreshRequest):
    # ...
    u_id:int = payload.get("id")
    if username is None or u_id is None:
        raise  exception_invalid_token

    return await create_token_pair(
        {"username": username , "id": u_id},
        {"username": username , "id": u_id}
    )

回到 update_user
我們可以透過 payload 來取得 usernameid
並且檢查 id 是否與 user_id 相同
如果不相同,就回傳 403 Permission Denied

api/user.py

@router.put("/users/{user_id}" , response_model=UserSchema.UserUpdateResponse )
# ...
    payload = await verify_access_token(token)
    
    if payload.get("id") != user_id:
        raise HTTPException(status_code=403, detail="Permission denied")

    # ...

當我們以 test 用戶的 Token 去打 PUT /api/users/1 ( user1 用戶) 時
就會回傳 403 Permission Denied

permission denied

改為打自己 ( test 用戶) 的 PUT /api/users/2
就可以成功更新

update user 2

Get Current User Dependency

對於需要 Authorize 的 API Endpoint
我們可以透過 get_current_user 來簡化取得當前 User 的過程
讓我們不用再所有的 Route 都寫上 token:str = Depends(OAuth2PasswordBearer(tokenUrl="api/auth/login"))payload = await verify_access_token(token)

auth/utils.py

from fastapi import HTTPException

from crud.users import UserCrudManager
from schemas.auth import oauth2_token_scheme
from auth.jwt import verify_access_token

UserCrud = UserCrudManager()

async def get_current_user(token = oauth2_token_scheme ):
    payload = await verify_access_token(token)
    
    user_id = int(payload.get("id"))
    user = await UserCrud.get_user_by_id(user_id)

    if user is None:
        raise HTTPException(
            status_code=401,
            detail="Invalid token",
            headers={"WWW-Authenticate": "Bearer"}
        )
    
    return user

接下來需要 Authorize 的 API Endpoint
都可以直接使注入get_current_user Dependency
來取得當前登入的 User

api/user.py

@router.put("/users/{user_id}" , response_model=UserSchema.UserUpdateResponse )
async def update_user(
    newUser: UserSchema.UserUpdate,
    user_id:int=Depends(check_user_id),
    user = Depends(get_current_user) 
):
    if user.id != user_id:
        raise HTTPException(status_code=403, detail="Permission denied")

    # ...

就只需要 user = Depends(get_current_user)
就可以取得當前登入的 User
再判斷 user.id 是否與 user_id 相同即可

總結

今天我們完成了 OAuth2 password login 的實作
並且實作了 Authorize Dependency 和 權限管理
也完成 JWT token 的 Refresh 換發機制

明天我們把到目前的專案整理一下
為目前 OAuth2 password login 的實作做一個總結

Reference

FastAPI : OAuth2 with Password (and hashing), Bearer with JWT tokens


上一篇
[Day18] OAuth2 實例: OAuth2 Schema & JWT
下一篇
[Day20] OAuth2 實例:實作總結
系列文
FastAPI 如何 Fast ? 框架入門、實例、重構與測試31
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言